<?php

/*
  PHP memcache 队列类
  @author LKK/lianq.net
  @create on 19:22 2014/3/24
  @version 0.3
  @example:
  $obj = new memcacheQueue('PT');
  $aaa = mt_rand(1, 9999);
  $obj->add("$aaa"); //添加队列
  $obj->getQueueLength(); //获取队列总数
  $obj->read(1); //取出指定队列数量
  sleep(10);
  $a1 = $obj->get(1); //处理指定队列数量
  $a2 = implode(',', $a1);
  $a3 = explode(',', $a2);
  echo $a3[0];
  //$obj->memFlush();
 *
 */

class memcacheQueue {

	public static $client; //memcache客户端连接
	public $access; //队列是否可更新
	private $expire; //过期时间,秒,1~2592000,即30天内
	private $sleepTime; //等待解锁时间,微秒
	private $queueName; //队列名称,唯一值
	private $retryNum; //重试次数,= 10 * 理论并发数
	public $currentHead; //当前队首值
	public $currentTail; //当前队尾值

	const MAXNUM = 50000; //最大队列数,建议上限10K
	const HEAD_KEY = '_lkkQueueHead'; //队列首kye
	const TAIL_KEY = '_lkkQueueTail'; //队列尾key
	const VALU_KEY = '_lkkQueueValu_'; //队列值key
	const LOCK_KEY = '_lkkQueueLock_'; //队列锁key

	/*
	 * 构造函数
	 * @param string $queueName  队列名称
	 * @param int $expire 过期时间
	 * @param array $config  memcache配置
	 *
	 * @return <type>
	 */

	public function __construct($queueName = '', $expire = 0, $config = '') {
		if (empty($config)) {
			self::$client = memcache_pconnect('127.0.0.1', 11211);
		} elseif (is_array($config)) {//array('host'=>'127.0.0.1','port'=>'11211')
			self::$client = memcache_pconnect($config['host'], $config['port']);
		} elseif (is_string($config)) {//"127.0.0.1:11211"
			$tmp = explode(':', $config);
			$conf['host'] = isset($tmp[0]) ? $tmp[0] : '127.0.0.1';
			$conf['port'] = isset($tmp[1]) ? $tmp[1] : '11211';
			self::$client = memcache_pconnect($conf['host'], $conf['port']);
		}
		if (!self::$client)
			return false;

		ignore_user_abort(true); //当客户断开连接,允许继续执行
		set_time_limit(0); //取消脚本执行延时上限

		$this->access = false;
		$this->sleepTime = 1000;
		$expire = empty($expire) ? 3600 : intval($expire) + 1;
		$this->expire = $expire;
		$this->queueName = $queueName;
		$this->retryNum = 1000;

		$this->head_key = $this->queueName . self::HEAD_KEY;
		$this->tail_key = $this->queueName . self::TAIL_KEY;
		$this->lock_key = $this->queueName . self::LOCK_KEY;

		$this->_initSetHeadNTail();
	}

	/*
	 * 初始化设置队列首尾值
	 */

	private function _initSetHeadNTail() {
		//当前队列首的数值
		$this->currentHead = memcache_get(self::$client, $this->head_key);
		if ($this->currentHead === false)
			$this->currentHead = 0;

		//当前队列尾的数值
		$this->currentTail = memcache_get(self::$client, $this->tail_key);
		if ($this->currentTail === false)
			$this->currentTail = 0;
	}

	/*
	 * 当取出元素时,改变队列首的数值
	 * @param int $step 步长值
	 */

	private function _changeHead($step = 1) {
		if ($this->currentTail - $this->currentHead < $step) {
			$this->currentHead = $this->currentTail;
		} else {
			$this->currentHead += $step;
		}

		memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);
	}

	/*
	 * 当添加元素时,改变队列尾的数值
	 * @param int $step 步长值
	 * @param bool $reverse 是否反向
	 * @return null
	 */

	private function _changeTail($step = 1, $reverse = false) {
		if (!$reverse) {
			$this->currentTail += $step;
		} else {
			$this->currentTail -= $step;
		}

		memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);
	}

	/*
	 * 队列是否为空
	 * @return bool
	 */

	private function _isEmpty() {
		return (bool) ($this->currentHead === $this->currentTail);
	}

	/*
	 * 队列是否已满
	 * @return bool
	 */

	private function _isFull() {
		$len = $this->currentTail - $this->currentHead;
		return (bool) ($len === self::MAXNUM);
	}

	/*
	 * 队列加锁
	 */

	private function _getLock() {
		if ($this->access === false) {
			while (!memcache_add(self::$client, $this->lock_key, 1, false, $this->expire)) {
				usleep($this->sleepTime);
				@$i++;
				if ($i > $this->retryNum) {//尝试等待N次
					return false;
				}
			}

			$this->_initSetHeadNTail();
			return $this->access = true;
		}

		return $this->access;
	}

	/*
	 * 队列解锁
	 */

	private function _unLock() {
		memcache_delete(self::$client, $this->lock_key, 0);
		$this->access = false;
	}

	/*
	 * 获取当前队列的长度
	 * 该长度为理论长度,某些元素由于过期失效而丢失,真实长度<=该长度
	 * @return int
	 */

	public function getQueueLength() {
		$this->_initSetHeadNTail();
		return intval($this->currentTail - $this->currentHead);
	}

	/*
	 * 添加队列数据
	 * @param void $data 要添加的数据
	 * @return bool
	 */

	public function add($data) {
		if (!$this->_getLock())
			return false;

		if ($this->_isFull()) {
			$this->_unLock();
			return false;
		}

		$value_key = $this->queueName . self::VALU_KEY . strval($this->currentTail + 1);
		$result = memcache_set(self::$client, $value_key, $data, MEMCACHE_COMPRESSED, $this->expire);
		if ($result) {
			$this->_changeTail();
		}

		$this->_unLock();
		return $result;
	}

	/*
	 * 读取队列数据
	 * @param int $length 要读取的长度(反向读取使用负数)
	 * @return array
	 */

	public function read($length = 0) {
		if (!is_numeric($length))
			return false;
		$this->_initSetHeadNTail();

		if ($this->_isEmpty()) {
			return false;
		}

		if (empty($length))
			$length = self::MAXNUM; //默认所有
		$keyArr = array();
		if ($length > 0) {//正向读取(从队列首向队列尾)
			$tmpMin = $this->currentHead;
			$tmpMax = $tmpMin + $length;
			for ($i = $tmpMin; $i <= $tmpMax; $i++) {
				$keyArr[] = $this->queueName . self::VALU_KEY . $i;
			}
		} else {//反向读取(从队列尾向队列首)
			$tmpMax = $this->currentTail;
			$tmpMin = $tmpMax + $length;
			for ($i = $tmpMax; $i > $tmpMin; $i--) {
				$keyArr[] = $this->queueName . self::VALU_KEY . $i;
			}
		}

		$result = @memcache_get(self::$client, $keyArr);

		return $result;
	}

	/*
	 * 取出队列数据
	 * @param int $length 要取出的长度(反向读取使用负数)
	 * @return array
	 */

	public function get($length = 0) {
		if (!is_numeric($length))
			return false;
		if (!$this->_getLock())
			return false;

		if ($this->_isEmpty()) {
			$this->_unLock();
			return false;
		}

		if (empty($length))
			$length = self::MAXNUM; //默认所有
		$length = intval($length);
		$keyArr = array();
		if ($length > 0) {//正向读取(从队列首向队列尾)
			$tmpMin = $this->currentHead;
			$tmpMax = $tmpMin + $length;
			for ($i = $tmpMin; $i <= $tmpMax; $i++) {
				$keyArr[] = $this->queueName . self::VALU_KEY . $i;
			}
			$this->_changeHead($length);
		} else {//反向读取(从队列尾向队列首)
			$tmpMax = $this->currentTail;
			$tmpMin = $tmpMax + $length;
			for ($i = $tmpMax; $i > $tmpMin; $i--) {
				$keyArr[] = $this->queueName . self::VALU_KEY . $i;
			}
			$this->_changeTail(abs($length), true);
		}
		$result = @memcache_get(self::$client, $keyArr);

		foreach ($keyArr as $v) {//取出之后删除
			@memcache_delete(self::$client, $v, 0);
		}

		$this->_unLock();

		return $result;
	}

	/*
	 * 清空队列
	 */

	public function clear() {
		if (!$this->_getLock())
			return false;

		if ($this->_isEmpty()) {
			$this->_unLock();
			return false;
		}

		$tmpMin = $this->currentHead--;
		$tmpMax = $this->currentTail++;

		for ($i = $tmpMin; $i <= $tmpMax; $i++) {
			$tmpKey = $this->queueName . self::VALU_KEY . $i;
			@memcache_delete(self::$client, $tmpKey, 0);
		}

		$this->currentTail = $this->currentHead = 0;
		memcache_set(self::$client, $this->head_key, $this->currentHead, false, $this->expire);
		memcache_set(self::$client, $this->tail_key, $this->currentTail, false, $this->expire);

		$this->_unLock();
	}

	/*
	 * 清除所有memcache缓存数据
	 */

	public function memFlush() {
		memcache_flush(self::$client);
	}

}
